{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Handle Evolving Workflows\n",
    "=========================\n",
    "\n",
    "For some workflows we don't know the extent of the computation at the outset.  We need to do some computation in order to figure out the rest of the computation that we need to do.  The computation grows and evolves as we do more work.\n",
    "\n",
    "As an example, consider a situation where you need to read many files and then based on the contents of those files, fire off additional work.  You would like to read the files in parallel, and then within each file expose more parallelism.\n",
    "\n",
    "This example goes through three ways to handle this situation using [Dask Futures](https://docs.dask.org/en/latest/futures.html)\n",
    "\n",
    "1.  Using `as_completed`\n",
    "2.  Using `async/await`\n",
    "3.  Launching tasks from tasks\n",
    "\n",
    "But first, lets run our code sequentially."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "0: Sequential code\n",
    "------------------"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['file.0.txt', 'file.1.txt', 'file.2.txt']"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "filenames = [\"file.{}.txt\".format(i) for i in range(10)]\n",
    "\n",
    "filenames[:3]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "import random, time\n",
    "\n",
    "\n",
    "def parse_file(fn: str) -> list:\n",
    "    \"\"\" Returns a list work items of unknown length \"\"\"\n",
    "    time.sleep(random.random())\n",
    "    return [random.random() for _ in range(random.randint(1, 10))]\n",
    "\n",
    "def process_item(x: float):\n",
    "    \"\"\" Process each work item \"\"\"\n",
    "    time.sleep(random.random() / 4)\n",
    "    return x + 1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 2.89 ms, sys: 2.15 ms, total: 5.04 ms\n",
      "Wall time: 12.7 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "# This takes around 10-20s\n",
    "\n",
    "results = []\n",
    "\n",
    "for fn in filenames:\n",
    "    L = parse_file(fn)\n",
    "    for x in L:\n",
    "        out = process_item(x)\n",
    "        results.append(out)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Start Dask Client\n",
    "-----------------\n",
    "\n",
    "We'll need a Dask client in order to manage dynamic workloads"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<table style=\"border: 2px solid white;\">\n",
       "<tr>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3 style=\"text-align: left;\">Client</h3>\n",
       "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
       "  <li><b>Scheduler: </b>inproc://192.168.1.88/22793/1</li>\n",
       "  <li><b>Dashboard: </b><a href='http://192.168.1.88/22793/1:8787/status' target='_blank'>http://192.168.1.88/22793/1:8787/status</a>\n",
       "</ul>\n",
       "</td>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3 style=\"text-align: left;\">Cluster</h3>\n",
       "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
       "  <li><b>Workers: </b>1</li>\n",
       "  <li><b>Cores: </b>6</li>\n",
       "  <li><b>Memory: </b>17.18 GB</li>\n",
       "</ul>\n",
       "</td>\n",
       "</tr>\n",
       "</table>"
      ],
      "text/plain": [
       "<Client: 'inproc://192.168.1.88/22793/1' processes=1 threads=6, memory=17.18 GB>"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dask.distributed import Client\n",
    "\n",
    "client = Client(processes=False, n_workers=1, threads_per_worker=6)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "1: Use as_completed\n",
    "-------------------\n",
    "\n",
    "The [as_completed](https://docs.dask.org/en/latest/futures.html#distributed.as_completed) iterator lets us handle futures as they complete.  We can then submit more data on the fly.\n",
    "\n",
    "-   We submit a task for each of our filenames\n",
    "-   We also compute the length of each of the returned lists\n",
    "-   As those lengths return, we submit off a new task to get each item of that list.  We do this at higher priority, so that we process existing data before we collect new data.\n",
    "-   We wait on all of the returned results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 479 ms, sys: 42.1 ms, total: 521 ms\n",
      "Wall time: 1.99 s\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[1.3111611115161288,\n",
       " 1.6710683157391781,\n",
       " 1.7164428468748616,\n",
       " 1.800622553076163,\n",
       " 1.727166530465721,\n",
       " 1.2435391897040307,\n",
       " 1.7056947623387475,\n",
       " 1.0596045673985244,\n",
       " 1.8857891985999329,\n",
       " 1.6158345289423508,\n",
       " 1.919814401986857,\n",
       " 1.395797021070106,\n",
       " 1.2328607075558677,\n",
       " 1.8716573039246027,\n",
       " 1.1932916116625059,\n",
       " 1.8603989952102773,\n",
       " 1.0328788488139926,\n",
       " 1.2028754763374359,\n",
       " 1.1320963319137496,\n",
       " 1.721696627844525,\n",
       " 1.3847105665467672,\n",
       " 1.5209424563772096,\n",
       " 1.3011548414159444,\n",
       " 1.0068324689443795,\n",
       " 1.7988131312639295,\n",
       " 1.623488584383071,\n",
       " 1.3647231496854348,\n",
       " 1.766826156334631,\n",
       " 1.389170976744607,\n",
       " 1.5207986950905719,\n",
       " 1.4630442325144821,\n",
       " 1.0928294302684436,\n",
       " 1.158048714608169,\n",
       " 1.5057672112122034,\n",
       " 1.3051738493142957,\n",
       " 1.7915686291889927,\n",
       " 1.4641025730591553,\n",
       " 1.5689841130675308,\n",
       " 1.8939205181190344,\n",
       " 1.4897614212992683,\n",
       " 1.837505474832139,\n",
       " 1.6853162616031758,\n",
       " 1.5703147725335822,\n",
       " 1.324593341124221,\n",
       " 1.9752197685334623,\n",
       " 1.9836642244658802,\n",
       " 1.1166666932969158,\n",
       " 1.4426310014217711,\n",
       " 1.981323244271089,\n",
       " 1.1029521014180235]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "from dask.distributed import as_completed\n",
    "import operator\n",
    "\n",
    "lists = client.map(parse_file, filenames, pure=False)\n",
    "lengths = client.map(len, lists)\n",
    "\n",
    "mapping = dict(zip(lengths, lists))\n",
    "\n",
    "futures = []\n",
    "\n",
    "for future in as_completed(lengths):\n",
    "    n = future.result()\n",
    "    L = mapping[future]\n",
    "    for i in range(n):\n",
    "        new = client.submit(operator.getitem, L, i, priority=1)\n",
    "        new = client.submit(process_item, new, priority=1)\n",
    "        futures.append(new)\n",
    "        \n",
    "client.gather(futures)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "2: Use async/await to handle single file processing locally\n",
    "-----------------------------------------------------------\n",
    "\n",
    "We can also handle the concurrency here within our local process.  This requires you to understand async/await syntax, but is generally powerful and arguably simpler than the `as_completed` approach above."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "\n",
    "async def f(fn):\n",
    "    \"\"\" Handle the lifecycle of a single file \"\"\"\n",
    "    future = client.submit(parse_file, fn, pure=False)\n",
    "    length_future = client.submit(len, future)\n",
    "    length = await length_future\n",
    "    \n",
    "    futures = [client.submit(operator.getitem, future, i, priority=10) \n",
    "               for i in range(length)]\n",
    "    futures = client.map(process_item, futures, priority=10)\n",
    "    return futures\n",
    "\n",
    "async def run_all(filenames):\n",
    "    list_of_list_of_futures = await asyncio.gather(*[f(fn) for fn in filenames])\n",
    "    futures = sum(list_of_list_of_futures, [])\n",
    "    return await client.gather(futures)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We now need to run this function in the same event loop as our client is running.  If we had started our client asynchronously, then we could have done this:\n",
    "\n",
    "```python\n",
    "client = await Client(asynchronous=True)\n",
    "\n",
    "await run_all(filenames)\n",
    "```\n",
    "\n",
    "However, because we started our client without the `asynchronous=True` flag the event loop is actually running in a separate thread, so we'll have to ask the client to run this for us."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1.357297484443608,\n",
       " 1.459251410204741,\n",
       " 1.017850718204474,\n",
       " 1.0103207961577314,\n",
       " 1.930213720287629,\n",
       " 1.2857328191406316,\n",
       " 1.5820068867733315,\n",
       " 1.235126464814356,\n",
       " 1.3017973825770213,\n",
       " 1.280494353530698,\n",
       " 1.2621753092912116,\n",
       " 1.0692221690014656,\n",
       " 1.3409450020077975,\n",
       " 1.76095194793966,\n",
       " 1.406913349808808,\n",
       " 1.1629781372157746,\n",
       " 1.284890053291389,\n",
       " 1.296110865907317,\n",
       " 1.6043190271860084,\n",
       " 1.6183468236638476,\n",
       " 1.0371823069625306,\n",
       " 1.1440474354383736,\n",
       " 1.2700546868202787,\n",
       " 1.4251602095333846,\n",
       " 1.8081116247079962,\n",
       " 1.3095577060322032,\n",
       " 1.378127448760599,\n",
       " 1.4198662907544803,\n",
       " 1.289672077527427,\n",
       " 1.3311575824239048,\n",
       " 1.653425104334226,\n",
       " 1.3210495246312448,\n",
       " 1.4387470035896208,\n",
       " 1.1943843706156694,\n",
       " 1.3371702554982288,\n",
       " 1.402034603527444,\n",
       " 1.986081438672693,\n",
       " 1.727300297178989,\n",
       " 1.0947838765328108]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "client.sync(run_all, filenames)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "3: Submit tasks from tasks\n",
    "--------------------------\n",
    "\n",
    "We can also submit tasks that themselves submit more tasks.  See [documentation here](https://docs.dask.org/en/latest/futures.html#submit-tasks-from-tasks)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 505 ms, sys: 44.8 ms, total: 550 ms\n",
      "Wall time: 2.44 s\n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "from dask.distributed import get_client, secede, rejoin\n",
    "\n",
    "def f(fn):\n",
    "    L = parse_file(fn)\n",
    "    client = get_client()\n",
    "    \n",
    "    futures = client.map(process_item, L, priority=10)\n",
    "    secede()\n",
    "    results = client.gather(futures)\n",
    "    rejoin()\n",
    "    return results\n",
    "\n",
    "futures = client.map(f, filenames, pure=False)\n",
    "results = client.gather(futures)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
